Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(parquet): Add next_row_group API for ParquetRecordBatchStream #6907

Merged
merged 2 commits into from
Dec 24, 2024

Conversation

Xuanwo
Copy link
Member

@Xuanwo Xuanwo commented Dec 20, 2024

Which issue does this PR close?

Rationale for this change

Add async fn next_row_group() for ParquetRecordBatchStream so that users can fecth row groups based on their needs and decode the data seperately.

This PR marks the first step in further decoupling the I/O and decoding processes of Parquet reading.

What changes are included in this PR?

Add new API:

pub async fn next_row_group(&mut self) -> Result<Option<ParquetRecordBatchReader>> { ... }

Are there any user-facing changes?

Yes.

@github-actions github-actions bot added the parquet Changes to the parquet crate label Dec 20, 2024
/// - `Ok(None)` if the stream has ended.
/// - `Err(error)` if the stream has errored. All subsequent calls will return `Ok(None)`.
/// - `Ok(Some(reader))` which holds all the data for the row group.
pub async fn next_row_group(&mut self) -> Result<Option<ParquetRecordBatchReader>> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if next_row_group is the best name, open to other options.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is a good, clear name as it clearly explains what it does

Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this PR does have a certain elegant simplicity to it, however, it doesn't really solve the separation of IO and compute given that reader_factory.read_factory potentially performs CPU-bound parquet decoding as part of late materialization / filter pushdown. It also has no ability to be parallelised.

Given that this isn't adding a host of additional complexity, I don't object to merging this in, but I wanted to flag that a solution to that problem likely will require something a bit different.

pub async fn next_row_group(&mut self) -> Result<Option<ParquetRecordBatchReader>> {
loop {
match &mut self.state {
StreamState::Decoding(_) | StreamState::Reading(_) => unreachable!(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should probably return an error saying not to mix polling the stream and using this API

@alamb
Copy link
Contributor

alamb commented Dec 20, 2024

So this PR does have a certain elegant simplicity to it, however, it doesn't really solve the separation of IO and compute given that reader_factory.read_factory potentially performs CPU-bound parquet decoding as part of late materialization / filter pushdown.

I agree it doesn't solve (nor claim to) separting CPU and compute. Also, neither does what is currently in the repo

It also has no ability to be parallelised.

I don't understand the assertion that this can't be parallelized. Do you mean there is now way to have concurrent outstanding fetch requests?

As I understand it, once the reader is returned, reading from the returned stream actually decodes the parquet data so this PR would allow the next IO to be interleaved with actually decoding the data.

Given that this isn't adding a host of additional complexity, I don't object to merging this in, but I wanted to flag that a solution to that problem likely will require something a bit different.

I think we could support concurrent download / decode on multiple row groups of the same file today by creating multiple ParquetRecordBatchStream (each for a different row group / set of row groups) 🤔 Maybe it doesn't need a new API

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much @Xuanwo

After @tustvold 's comments about error vs panic are addressed, I think this PR looks good to me.

@masonh22 can you give this PR a look and see if it would work for your usecase?

@alamb
Copy link
Contributor

alamb commented Dec 20, 2024

FYI @etseidl

@tustvold
Copy link
Contributor

tustvold commented Dec 20, 2024

I agree it doesn't solve (nor claim to) separting CPU and compute. Also, neither does what is currently in the repo

Right this was in response to #6676 (comment) which instigated this PR.

I'm mostly wary of merging an API if we're going to have to replace it in order to meet the desired use-case

I don't understand the assertion that this can't be parallelized. Do you mean there is now way to have concurrent outstanding fetch requests?

The PR attests to be related to #5522 which concerns this

Edit:

I think we could support concurrent download / decode on multiple row groups of the same file today by creating multiple ParquetRecordBatchStream (each for a different row group / set of row groups) 🤔 Maybe it doesn't need a new API

Yes, which is what Datafusion does today. It is somewhat arcane to get it to work, but is documented here

@alamb
Copy link
Contributor

alamb commented Dec 20, 2024

'm mostly wary of merging an API if we're going to have to replace it in order to meet the desired use-case

I agree if we have some actual alternative in mind we should evaluate that prior to merging this PR

It seems to me this PR makes it possible to interleave IO and decode which the current API does not.

I agree it does not address the other parts of #5522 (like parallel decode of columns, for example). I updated the description to say it closed #6559

@alamb
Copy link
Contributor

alamb commented Dec 20, 2024

IN my opinion, even if we add some newer low level API there is still value to this higher one that permits interleaved download and decode, as described on

@masonh22
Copy link

I like this! This will work for what I need.

Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy for this to be merged, unless @Xuanwo it doesn't meet your requirements and you plan to add something different instead

@Xuanwo
Copy link
Member Author

Xuanwo commented Dec 23, 2024

Thank you, @alamb and @tustvold, for the review. I will address the error-handling issues, and then we can proceed with merging!

@alamb
Copy link
Contributor

alamb commented Dec 24, 2024

fyi @thinkharderdev

@alamb
Copy link
Contributor

alamb commented Dec 24, 2024

I think this is an improvement so merging it in. If others have additional ideas on other improvements or changes please open another PR.

Thanks again @Xuanwo @tustvold and @masonh22

@alamb alamb merged commit 10cf03c into apache:main Dec 24, 2024
16 checks passed
@Xuanwo Xuanwo deleted the add-next-row-group branch December 24, 2024 15:26
CurtHagenlocher pushed a commit to CurtHagenlocher/arrow-rs that referenced this pull request Dec 28, 2024
…pache#6907)

* feat(parquet): Add next_row_group API for ParquetRecordBatchStream

Signed-off-by: Xuanwo <[email protected]>

* chore: Returning error instead of using unreachable

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: Xuanwo <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
parquet Changes to the parquet crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

ParquetRecordBatchStream API to fetch the next row group while decoding
4 participants